OpenTelemetry Collector Extension編
Extension はパイプライン外で動作する、Collector に機能を追加するコンポーネントです。Receiver/Processor/Exporter のデータパイプラインには直接関与せず、認証・認可、ヘルスチェック、永続化ストレージなどの補助機能を提供します。
Extension の基本 Interface は component.Component を埋め込んだシンプルな定義です:
code:go
// Extension is the interface for objects hosted by the OpenTelemetry Collector that
// don't participate directly on data pipelines but provide some functionality
// to the service, examples: health check endpoint, z-pages, etc.
type Extension interface {
component.Component
}
Extension は必要に応じて追加の Interface を実装できます
code:go
// Dependent は他の Extension に依存する Extension が実装する
// 依存先が先に起動されることを保証する
type Dependent interface {
extension.Extension
Dependencies() []component.ID
}
// PipelineWatcher は Pipeline の状態変化に関心がある Extension が実装する
// 例: K8s readiness probe
type PipelineWatcher interface {
// Ready は全 Pipeline が起動完了したときに呼ばれる
Ready() error
// NotReady は Pipeline が停止を開始する前に呼ばれる
NotReady() error
}
// ConfigWatcher は Collector の設定変更に関心がある Extension が実装する
type ConfigWatcher interface {
NotifyConfig(ctx context.Context, conf *confmap.Conf) error
}
認証・認可機能を提供する Extension です。Server 側(Receiver)と Client 側(Exporter)の両方に対応します。
code:go
// Server 側(Receiver で使用)
type Server interface {
Authenticate(ctx context.Context, sources mapstring[]string) (context.Context, error) }
// Client 側(Exporter で使用)
type HTTPClient interface {
RoundTripper(base http.RoundTripper) (http.RoundTripper, error)
}
type GRPCClient interface {
PerRPCCredentials() (credentials.PerRPCCredentials, error)
}
たとえば basicauth の contrib を使うと Receiver へ user/pass の認証をつけられます。
code:yaml
extensions:
basicauth/server:
htpasswd:
file: .htpasswd
inline: |
${env:BASIC_AUTH_USERNAME}:${env:BASIC_AUTH_PASSWORD}
receivers:
otlp:
protocols:
http:
auth:
authenticator: basicauth/server
HTTP/gRPC 接続にカスタムロジックを注入する Extension です。
code:go
// Server 側
type HTTPServer interface {
GetHTTPHandler(base http.Handler) (http.Handler, error)
}
type GRPCServer interface {
GetGRPCServerOptions() ([]grpc.ServerOption, error)
}
// Client 側
type HTTPClient interface {
GetHTTPRoundTripper(base http.RoundTripper) (http.RoundTripper, error)
}
type GRPCClient interface {
GetGRPCClientOptions() ([]grpc.DialOption, error)
}
Storage Extension
永続化ストレージ機能を提供する Extension です。Exporter の persistent_queue などで使用されます。
code:go
type Extension interface {
extension.Extension
GetClient(ctx context.Context, kind component.Kind, id component.ID, name string) (Client, error)
}
type Client interface {
Get(ctx context.Context, key string) ([]byte, error)
Set(ctx context.Context, key string, value []byte) error
Delete(ctx context.Context, key string) error
Batch(ctx context.Context, ops ...Operation) error
Close(ctx context.Context) error
}
ライフサイクル
Extension は Collector Service によって管理され、パイプラインを包み込む形でライフサイクルが設計されています。
起動・終了の全体像
code:md
┌───────────────────────────────────────────────────────────────────────┐
│ Collector Start │
├───────────────────────────────────────────────────────────────────────┤
│ 1. Extension.Start() ← 依存関係順にソート │
│ • StatusStarting → StatusOK │
│ │
│ 2. Extension.NotifyConfig() ← ConfigWatcher 実装のみ │
│ │
│ 3. Pipeline.StartAll() (Exporters → Processors → Receivers) │
│ │
│ 4. Extension.Ready() ← PipelineWatcher 実装のみ │
├───────────────────────────────────────────────────────────────────────┤
│ ~ 稼働中 ~ │
├───────────────────────────────────────────────────────────────────────┤
│ Collector Shutdown │
├───────────────────────────────────────────────────────────────────────┤
│ 1. Extension.NotReady() ← PipelineWatcher 実装のみ │
│ │
│ 2. Pipeline.ShutdownAll() (Receivers → Processors → Exporters) │
│ │
│ 3. Extension.Shutdown() ← 起動と逆順 │
│ • StatusStopping → StatusStopped │
└───────────────────────────────────────────────────────────────────────┘
Start / Shutdown 処理
code:go
func (bes *Extensions) Start(ctx context.Context, host component.Host) error {
bes.telemetry.Logger.Info("Starting extensions...")
for _, extID := range bes.extensionIDs {
bes.reporter.ReportStatus(instanceID, componentstatus.NewEvent(componentstatus.StatusStarting))
if err := ext.Start(ctx, host); err != nil {
bes.reporter.ReportStatus(instanceID, componentstatus.NewPermanentErrorEvent(err))
return err
}
bes.reporter.ReportOKIfStarting(instanceID)
}
return nil
}
func (bes *Extensions) Shutdown(ctx context.Context) error {
bes.telemetry.Logger.Info("Stopping extensions...")
var errs error
// 逆順でシャットダウン
for i := len(bes.extensionIDs) - 1; i >= 0; i-- {
extID := bes.extensionIDsi bes.reporter.ReportStatus(instanceID, componentstatus.NewEvent(componentstatus.StatusStopping))
if err := ext.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, err)
continue
}
bes.reporter.ReportStatus(instanceID, componentstatus.NewEvent(componentstatus.StatusStopped))
}
return errs
}
設計のポイント
起動: Extension → Pipeline(Extension が Auth や Storage を提供するので先に起動)
終了: Pipeline → Extension(Pipeline がまだ Extension を使う可能性があるので後に終了)
依存関係の解決
extensioncapabilities.Dependent を実装した Extension は、トポロジカルソートによって依存順でソートされます:
code:go
func computeOrder(exts *Extensions) ([]component.ID, error) {
graph := simple.NewDirectedGraph()
// 全 Extension をノードとしてグラフに追加
for extID := range exts.extMap {
n := &node{nodeID: int64(len(nodes) + 1), extID: extID}
graph.AddNode(n)
}
// 依存関係をエッジとして追加
for extID, ext := range exts.extMap {
if dep, ok := ext.(extensioncapabilities.Dependent); ok {
for _, depID := range dep.Dependencies() {
if !ok {
return nil, fmt.Errorf("unable to find extension %s on which extension %s depends", depID, extID)
}
graph.SetEdge(graph.NewEdge(d, n)) // d → n (d が先に起動)
}
}
}
// トポロジカルソート
orderedNodes, err := topo.Sort(graph)
if err != nil {
return nil, cycleErr(err, topo.DirectedCyclesIn(graph)) // 循環依存検出
}
order := make([]component.ID, len(orderedNodes))
for i, n := range orderedNodes {
}
return order, nil
}
gonum.org/v1/gonum/graph/topo を使用
循環依存がある場合はエラーで検出
Shutdown は Start の逆順で実行
通知フック
Extension は Pipeline やコンポーネントの状態変化を監視できます
table:_
Interface メソッド 呼び出しタイミング 用途例
ConfigWatcher NotifyConfig() Extension 起動後、Pipeline 起動前 設定変更の監視
PipelineWatcher Ready() Pipeline 起動完了後 K8s readiness probe
PipelineWatcher NotReady() Pipeline 終了開始前 graceful shutdown 準備
componentstatus.Watcher ComponentStatusChanged() 任意のコンポーネントの状態変化時 ヘルスチェック
code:go
func (bes *Extensions) NotifyPipelineReady() error {
for _, extID := range bes.extensionIDs {
if pw, ok := ext.(extensioncapabilities.PipelineWatcher); ok {
if err := pw.Ready(); err != nil {
return fmt.Errorf("failed to notify extension %q: %w", extID, err)
}
}
}
return nil
}
func (bes *Extensions) NotifyComponentStatusChange(source *componentstatus.InstanceID, event *componentstatus.Event) {
for _, extID := range bes.extensionIDs {
if sw, ok := ext.(componentstatus.Watcher); ok {
sw.ComponentStatusChanged(source, event)
}
}
}
Receiver や Exporter から Extension にアクセスするには component.Host.GetExtensions() を使用します:
code:go
func (r *myReceiver) Start(ctx context.Context, host component.Host) error {
// 全ての Extension を取得
extensions := host.GetExtensions()
// 特定の Auth Extension を取得
if authExt, ok := ext.(extensionauth.Server); ok {
r.authServer = authExt
}
}
return nil
}
実装例
zPages Extension(デバッグ UI)
code:go
func (z *zpagesExtension) Start(ctx context.Context, host component.Host) error {
// zpages のスパンプロセッサを登録
zpages.NewSpanProcessor()
// HTTP サーバーを起動して /debug/tracez, /debug/expvarz を公開
z.server = &http.Server{...}
go z.server.ListenAndServe()
return nil
}
Memory Limiter Extension(ミドルウェア)
code:go
// GRPCServer と HTTPServer の両方を実装
func (ml *memoryLimiterExtension) GetHTTPHandler(base http.Handler) (http.Handler, error) {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if ml.memLimiter.MustRefuse() {
http.Error(w, "memory limit exceeded", http.StatusServiceUnavailable)
return
}
base.ServeHTTP(w, r)
}), nil
}
まとめ
Extentionを見ました。